/*
 * Unidata Platform Community Edition
 * Copyright (c) 2013-2020, UNIDATA LLC, All rights reserved.
 * This file is part of the Unidata Platform Community Edition software.
 *
 * Unidata Platform Community Edition is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * Unidata Platform Community Edition is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program. If not, see <https://www.gnu.org/licenses/>.
 */

package org.unidata.mdm.data.service.segments.records.delete;

import static org.unidata.mdm.meta.type.search.RecordHeaderField.FIELD_ETALON_ID;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;

import javax.annotation.Nullable;

import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.unidata.mdm.core.service.MetaModelService;
import org.unidata.mdm.core.type.data.CodeAttribute;
import org.unidata.mdm.core.type.data.DataRecord;
import org.unidata.mdm.core.type.data.RecordStatus;
import org.unidata.mdm.core.type.model.AttributeElement;
import org.unidata.mdm.core.type.model.EntityElement;
import org.unidata.mdm.core.type.model.LookupElement;
import org.unidata.mdm.core.type.model.RegisterElement;
import org.unidata.mdm.core.type.timeline.Timeline;
import org.unidata.mdm.core.util.SecurityUtils;
import org.unidata.mdm.data.context.DeleteRequestContext;
import org.unidata.mdm.data.context.GetRelationsTimelineRequestContext;
import org.unidata.mdm.data.context.RecordIdentityContextSupport;
import org.unidata.mdm.data.exception.DataConsistencyException;
import org.unidata.mdm.data.exception.DataExceptionIds;
import org.unidata.mdm.data.module.DataModule;
import org.unidata.mdm.data.service.impl.CommonRelationsComponent;
import org.unidata.mdm.data.service.segments.TimelineSelectionSupport;
import org.unidata.mdm.data.type.data.EtalonRecord;
import org.unidata.mdm.data.type.data.OriginRelation;
import org.unidata.mdm.data.type.data.RelationType;
import org.unidata.mdm.data.type.keys.RecordKeys;
import org.unidata.mdm.data.type.keys.RelationKeys;
import org.unidata.mdm.meta.configuration.Descriptors;
import org.unidata.mdm.meta.type.search.EntityIndexType;
import org.unidata.mdm.meta.type.search.RecordHeaderField;
import org.unidata.mdm.search.context.SearchRequestContext;
import org.unidata.mdm.search.context.TermsAggregationRequestContext;
import org.unidata.mdm.search.dto.SearchResultDTO;
import org.unidata.mdm.search.service.SearchService;
import org.unidata.mdm.search.type.form.FieldsGroup;
import org.unidata.mdm.search.type.form.FormField;
import org.unidata.mdm.search.type.query.SearchQuery;
import org.unidata.mdm.search.util.SearchUtils;
import org.unidata.mdm.system.exception.ValidationResult;
import org.unidata.mdm.system.type.pipeline.Point;
import org.unidata.mdm.system.type.pipeline.Start;

@Component(RecordDeleteDataConsistencyExecutor.SEGMENT_ID)
public class RecordDeleteDataConsistencyExecutor extends Point<DeleteRequestContext>
        implements RecordIdentityContextSupport, TimelineSelectionSupport<DeleteRequestContext, EtalonRecord> {
    /**
     * This segment ID.
     */
    public static final String SEGMENT_ID = DataModule.MODULE_ID + "[RECORD_DELETE_DATA_CONSISTENCY]";
    /**
     * Localized message code.
     */
    public static final String SEGMENT_DESCRIPTION = DataModule.MODULE_ID + ".record.delete.data.consistency.description";
    /**
     * Aggregation name
     */
    private static final String AGGR_NAME = "AGGR_NAME";
    /**
     * Search service.
     */
    @Autowired
    private SearchService searchService;
    /**
     * MetaModel service.
     */
    @Autowired
    private MetaModelService metaModelService;
    /**
     * Relations common component.
     */
    @Autowired
    private CommonRelationsComponent commonRelationsComponent;
    /**
     * Constructor.
     */
    public RecordDeleteDataConsistencyExecutor() {
        super(SEGMENT_ID, SEGMENT_DESCRIPTION);
    }

    @Override
    public void point(DeleteRequestContext ctx) {

        //we process only etalon removing and etalon period removing!
        RecordKeys keys = ctx.keys();
        if (!(ctx.isInactivateEtalon() || ctx.isInactivatePeriod() || ctx.isWipe())) {
            return;
        }

        String entityName = keys.getEntityName();

        EntityElement element = metaModelService.instance(Descriptors.DATA)
                .getElement(entityName);

        if (element.isLookup()) {
            processingAsLookup(element.getLookup(), ctx);
        } else {
            processingAsRegister(element.getRegister(), ctx, keys.getEtalonKey().getId());
        }
    }

    private void processingAsLookup(LookupElement le, DeleteRequestContext ctx) {

        String codeAttrName = le.getCodeAttribute().getName();
        DataRecord record = getCurrentEtalonRecord(ctx);

        // try get etalon by validity period
        if (record == null) {
            record = getFirstNonNullCalculationResult(ctx);
        }

        // can't get etalon by keys
        if (record == null) {
            return;
        }

        FieldsGroup timeLineFormFields = timeLineFormField(ctx);

        //always one code attr!
        CodeAttribute<?> codeAttribute = record.getCodeAttribute(codeAttrName);

        Map<String, Long> linkedRecords = calculateLinkedRecords(le, timeLineFormFields, codeAttribute);
        if (!linkedRecords.isEmpty()) {

            ValidationResult v
                    = new ValidationResult("The etalon record has incoming relations and cannot be deleted. Connections from ({}).",
                    (String) null,
                    linkedRecords.entrySet().stream()
                            .map(entry -> StringUtils.join(entry.getKey(), " : ", entry.getValue().toString()))
                            .collect(Collectors.joining(", ")));

            throw new DataConsistencyException(
                    "The etalon record has incoming links and cannot be deleted.",
                    DataExceptionIds.EX_DATA_ETALON_HAS_INCOMING_LINKS,
                    Collections.singletonList(v)
            );
        }

    }

    private void processingAsRegister(RegisterElement re, DeleteRequestContext ctx, String etalonKey) {

        if (ctx.isInactivatePeriod()) {
            return;
        }

        List<String> relationNames = (re.getIncomingRelations().keySet().stream())
                .map(EntityElement::getName)
                .collect(Collectors.toList());

        if (CollectionUtils.isNotEmpty(relationNames)) {

            Map<String, List<Timeline<OriginRelation>>> result
                = commonRelationsComponent.loadTimelines(GetRelationsTimelineRequestContext.builder()
                .etalonKey(etalonKey)
                    .relationNames(relationNames)
                    .fetchData(false)
                    .fetchByToSide(true)
                    .build());

            Map<String, Long> collected = result.values().stream()
                    .flatMap(Collection::stream)
                    .map(Timeline::<RelationKeys>getKeys)
                    .filter(k -> k.getRelationType() != RelationType.CONTAINS)
                    .filter(k -> k.getEtalonKey().getStatus() == RecordStatus.ACTIVE
                         && k.getEtalonKey().getFrom().getStatus() == RecordStatus.ACTIVE)
                    .collect(Collectors.groupingBy(RelationKeys::getRelationName, Collectors.counting()));

            if (!collected.isEmpty()) {

                ValidationResult v
                    = new ValidationResult("The etalon record has incoming relations and cannot be deleted. Connections from ({}).",
                        (String) null,
                        collected.entrySet().stream()
                            .map(entry -> StringUtils.join(entry.getKey(), " : ", entry.getValue().toString()))
                            .collect(Collectors.joining(", ")));

                throw new DataConsistencyException(
                        "The etalon record has incoming relations and cannot be deleted.",
                        DataExceptionIds.EX_DATA_ETALON_HAS_INCOMING_RELATIONS,
                        Collections.singletonList(v)
                );
            }
        }
    }

    private Map<String, Long> calculateLinkedRecords(LookupElement le, FieldsGroup timeLineFormFields, CodeAttribute<?> codeAttribute) {

        Map<String, Long> linkedRecords = new HashMap<>();

        if (timeLineFormFields == null) {
            return linkedRecords;
        }

        // 1. Check Entity -> Lookup
        Map<RegisterElement, Set<AttributeElement>> entities = le.getReferencingRegisters();
        entities.entrySet()
                .stream()
                .map(e -> findLinks(e, codeAttribute, timeLineFormFields))
                .filter(Objects::nonNull)
                .forEach(pair -> linkedRecords.put(pair.getKey(), pair.getValue()));

        // 2. Check Lookup -> Lookup
        Map<LookupElement, Set<AttributeElement>> lookupEntities = le.getReferencingLookups();
        lookupEntities.entrySet()
                .stream()
                .map(e -> findLinks(e, codeAttribute, timeLineFormFields))
                .filter(Objects::nonNull)
                .forEach(pair -> linkedRecords.put(pair.getKey(), pair.getValue()));

        return linkedRecords;
    }

    @Nullable
    private Pair<String, Long> findLinks(Map.Entry<? extends EntityElement, Set<AttributeElement>> e,
                                         CodeAttribute<?> codeAttribute, FieldsGroup timeLineFormFields) {

        String linkedEntityName = e.getKey().getName();
        Set<AttributeElement> linkedAttrs = e.getValue();

        if (linkedAttrs.isEmpty()) {
            return null;
        }

        List<Object> values = new ArrayList<>();
        values.add(codeAttribute.getValue());
        if (codeAttribute.hasSupplementary()) {
            values.addAll(codeAttribute.getSupplementary());
        }

        // Code attributes, alternative code attributes and links to them
        // are expected to be indexed not analyzable.
        FieldsGroup baseForm = FieldsGroup.and(
                FormField.exact(RecordHeaderField.FIELD_DELETED, Boolean.FALSE),
                // FormField.exact(RecordHeaderField.FIELD_PUBLISHED, Boolean.TRUE),
                FormField.exact(RecordHeaderField.FIELD_INACTIVE, Boolean.FALSE))
                    .add(FieldsGroup.or(linkedAttrs.stream()
                        .map(linkedAttr -> FormField.exact(linkedAttr.getIndexed(), values))
                        .toArray(FormField[]::new)));

        TermsAggregationRequestContext taCtx = TermsAggregationRequestContext.builder()
                .name(AGGR_NAME)
                .path(FIELD_ETALON_ID.getName())
                .size(1)
                .build();

        SearchRequestContext ctx = SearchRequestContext.builder(EntityIndexType.RECORD, linkedEntityName, SecurityUtils.getCurrentUserStorageId())
                .totalCount(true)
                .skipEtalonId(true)
                .count(0)
                .page(0)
                .query(SearchQuery.formQuery(baseForm, timeLineFormFields))
                .aggregations(Collections.singletonList(taCtx))
                .build();

        SearchResultDTO result = searchService.search(ctx);
        final Long count = result.getTotalCount() - (
                CollectionUtils.isNotEmpty(result.getAggregates()) ?
                        result.getAggregates().stream()
                                .flatMap(a -> a.getCountMap().values().stream().map(v -> v - 1))
                                .mapToLong(v -> v)
                                .sum() :
                        0
        );

        if (count > 0) {
            return Pair.of(e.getKey().getDisplayName(), count);
        } else {
            return null;
        }
    }


    private FieldsGroup timeLineFormField(DeleteRequestContext ctx) {

        if (!ctx.isInactivatePeriod()) {
            return FieldsGroup.and();
        }

        return FieldsGroup.and(
                FormField.range(RecordHeaderField.FIELD_FROM, null, SearchUtils.coalesceTo(ctx.getValidTo())),
                FormField.range(RecordHeaderField.FIELD_TO, SearchUtils.coalesceFrom(ctx.getValidFrom()), null));
    }

    @Override
    public boolean supports(Start<?, ?> start) {
        return DeleteRequestContext.class.isAssignableFrom(start.getInputTypeClass());
    }
}
